package com.kafkaDemo.raw.demo4;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.function.Consumer;

class MyConsumer implements Runnable {
    private static final String TEST_TOPIC = "test1";
    private final KafkaConsumer<String, String> consumer;

    public MyConsumer() {
        Properties props = new Properties();
        //kafka集群信息
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //消费者组名称
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "dhy_group");
        //key的反序列化器
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //value的反序列化器
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //初始化消费者
        consumer = new KafkaConsumer<>(props);
    }

    @Override
    public void run() {
        consumeTemplate(MyConsumer::printRecord, null);
    }

    /**
     * recordConsumer针对单条数据进行处理，此方法中应该做好异常处理，避免外围的while循环因为异常中断。
     */
    public void consumeTemplate(Consumer<ConsumerRecord<String, String>> recordConsumer, Consumer<KafkaConsumer<String, String>> afterCurrentBatchHandle) {
        consumer.subscribe(Collections.singletonList(TEST_TOPIC));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
                for (ConsumerRecord<String, String> record : records) {
                    recordConsumer.accept(record);
                }
                if (afterCurrentBatchHandle != null) {
                    afterCurrentBatchHandle.accept(consumer);
                }
            }
        } finally {
            consumer.close();
        }
    }

    private static void printRecord(ConsumerRecord<String, String> record) {
        System.out.println("topic:" + record.topic()
                + ",partition:" + record.partition()
                + ",offset:" + record.offset()
                + ",key:" + record.key()
                + ",value" + record.value());
        record.headers().forEach(System.out::println);
    }
}
